package com.tonels.AsyncHandlerInterceptor.controller;

import com.google.common.collect.Lists;
import com.tonels.AsyncHandlerInterceptor.task.TaskExecutor;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

@RestController
@Slf4j
public class MyWebController {
//    private static ExecutorService es;
//    static {
//        final int count = Runtime.getRuntime().availableProcessors();
//        log.info("可用cppu : {}", count);
//        es = Executors.newFixedThreadPool(count * 2);
//    }

    @Resource
    private TaskExecutor taskExecutor;

    @GetMapping("/")
    public Callable<String> handleTestRequest() {
        System.out.println("controller#handler called. Thread: " +
                Thread.currentThread().getName());
        Callable<String> callable = new Callable<String>() {
            @Override
            public String call() throws Exception {
                System.out.println("controller-callable#async task started. Thread: " +
                        Thread.currentThread().getName());
                Thread.sleep(3000);
                System.out.println("controller-callable#async task finished");
                return "async result";
            }
        };

        System.out.println("controller#handler finished");
        return callable;
    }

    @GetMapping("/testml")
    public String t1(String t) throws InterruptedException {
        final long arg2 = System.currentTimeMillis();
        log.info("接入{},开始{}", t, arg2);
        Thread.sleep(5000);
        log.info("接入{},结束{}", t, System.currentTimeMillis() - arg2);
        return "ss";
    }

    @GetMapping("/es")
    public String t2(String t) throws InterruptedException {
        final long arg2 = System.currentTimeMillis();
        log.info("接入{},开始{}", t, arg2);
        Thread.sleep(5000);
        log.info("接入{},结束{}", t, System.currentTimeMillis() - arg2);
        return "ss";
    }

    /**
     * 测试多线程
     *
     * @return
     */
    @GetMapping("/test")
    public String mulRequest(String task) {
        log.info("当前参数：{}", task);
//        for (int i = 0; i < 10; i++) {
//            es.execute(new Task1(task + i));
//        }
////        es.shutdown();
////        try {
////            es.awaitTermination(3, TimeUnit.SECONDS);
////        } catch (InterruptedException e) {
////            log.error("请求{},shutdown异常", task);
////        }
//        ConcurrentUtils.stop(es);
        return task;
    }


    @GetMapping("/springThread")
    public String springRequest(String task) throws InterruptedException, ExecutionException {
        log.info("当前参数：{}", task);

        List<String> list = Lists.newArrayList();
        CompletableFuture<String> stringCompletableFuture = taskExecutor.task1(task);
        return stringCompletableFuture.get();
    }

    /**
     * todo 为什么 list size 一直是零？
     *
     * @param task
     * @return
     * @throws InterruptedException
     * @throws ExecutionException
     */
    @GetMapping("/forThread")
    public String forThread(String task) {
        log.info("当前参数：{}", task);
        List<String> list = Lists.newCopyOnWriteArrayList();
        List<Integer> collect = IntStream.range(0, 20).boxed().parallel().collect(Collectors.toList());
        collect.stream().parallel().map(String::valueOf).forEach(e -> {
            list.add(e + Math.random());
            log.info("当前线程池：{}，List Size{}", Thread.currentThread().getName(), list.size());
        });
        log.info("List 大小{}", list.size());
        return "success";
    }

    @GetMapping("/forThread2")
    public String forThread2(String task) {
        log.info("当前参数：{}", task);
        List<String> list = Lists.newCopyOnWriteArrayList();
        List<Integer> collect = IntStream.range(0, 100).boxed().parallel().collect(Collectors.toList());
        collect.stream().map(String::valueOf).forEach(e -> {
            try {
                taskExecutor.task1(e);
            } catch (InterruptedException ex) {
                ex.printStackTrace();
            }
        });
        log.info("List 大小= {}", list.size());
        return "success";
    }


    /**
     * main
     * new Thread -> mul Thread -> new Thread
     *
     * @param task
     * @return
     */
    @GetMapping("/forThread3")
    public String forThread3(String task) throws InterruptedException {
        log.info("当前参数：{}", task);
        List<String> collect = IntStream.range(0, 100).boxed().parallel().map(String::valueOf).collect(Collectors.toList());

        taskExecutor.task2("another",collect);
        return "Controller异步返回";
    }

    @GetMapping("/forThread3_1")
    public String forThread3_1(String task) throws InterruptedException {
        log.info("当前参数：{}", task);
        List<String> collect = IntStream.range(0, 100).boxed().parallel().map(String::valueOf).collect(Collectors.toList());

        new Thread(new AnotherTask("Another",collect)).start();
        return "Controller异步返回";
    }


    @GetMapping("/forThread3_2")
    public String forThread3_2(String task) throws InterruptedException {
        log.info("当前参数：{}", task);
        List<String> collect = IntStream.range(0, 100).boxed().parallel().map(String::valueOf).collect(Collectors.toList());
        anotherTask2(collect);
        return "Controller异步返回";
    }

    /**
     * Another thread
     */
    private void anotherTask2(List<String> integers) throws InterruptedException {
        taskExecutor.task2("another",integers);
    }

    //  ============================================== 线程定义 ======================================================
    private class Task1 implements Runnable {
        private String taskName;

        public Task1(String s) {
            this.taskName = s;
        }

        @Override
        public void run() {
            final String name = Thread.currentThread().getName();
            log.info("当前线程{},执行任务{}", name, taskName);
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                log.error("异常", e);
            }
            log.info("当前线程{},任务{}完毕", name, taskName);
        }
    }

    /**
     * Another thread
     */
    @Data
    private class AnotherTask implements Runnable {

        private List<String> integers;
        private String name;

        public AnotherTask(String another, List<String> integer) {
            this.integers = integer;
            this.name = another;
        }

        @Override
        public void run() {
            long startTime = System.currentTimeMillis();
            List<CompletableFuture<String>> result = Lists.newCopyOnWriteArrayList();
            integers.stream().forEach(e -> {
                try {
                    CompletableFuture<String> stringCompletableFuture = taskExecutor.task1(e);
                    result.add(stringCompletableFuture);
                } catch (InterruptedException ex) {
                    ex.printStackTrace();
                }
            });
            CompletableFuture[] cfs = result.toArray(new CompletableFuture[0]);
            CompletableFuture.allOf(cfs).join();
            log.info("Thread：{}，list size = {},总耗时 {} ms", this.getName(), result.size(), System.currentTimeMillis() - startTime);
            // http 数据传输
            System.out.println("任务合并后，继续 http 发送");
        }
    }




}